#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBTASK

#include "cluster.h"
#include "chunk.h"
#include "lich_md.h"
#include "net_table.h"
#include "configure.h"
#include "../replica/replica.h"
#include "disk.h"
#include "../../storage/controller/pool_ctl.h"
#include "../../storage/controller/volume_ctl.h"
#include "../../storage/controller/stor_ctl.h"
#include "../../storage/controller/md_proto.h"
#include "nodectl.h"
#include "recovery.h"
#include "types.h"
#include "token_bucket.h"
#include "dbg.h"
#include "utils.h"
#include "system.h"
#include "sysutil.h"
#include "etcd.h"
#include "net_global.h"
#include "locator_rpc.h"
#include "node.h"
#include "string_buffer.h"
#include "diskmd_recovery.h"


/**
 * @file Recovery Module
 *
 * 功能需求：
 * - 在处理过程中，某资源被删除，要及时感知到，并停止处理
 * - 能够处理增加/删除存储池的情况
 *
 * 若干设计目标和准则：
 * - 及时性
 * - 效率
 * - QoS，在恢复流量和业务IO流量之间，达到均衡，可控
 * - safety
 * - liveness，过程可以终止
 *
 * 控制参数：
 * - /opt/fusionstack/data/recovery/<pool>/thread
 * - /opt/fusionstack/data/recovery/<pool>/fill_rate
 * - /opt/fusionstack/data/recovery/<pool>/scale
 * - /opt/fusionstack/data/recovery/<pool>/recovery_total [out]
 * - /opt/fusionstack/data/recovery/<pool>/disk_recovery_total [out]
 * - /opt/fusionstack/data/recovery/<pool>/task_max [x]
 * - /opt/fusionstack/data/recovery/<pool>/qos_mode [x]
 *
 * - /dev/shm/lich4/nodectl/recovery/<pool>/interval
 * - /dev/shm/lich4/nodectl/recovery/<pool>/immediately [debug]
 * - /dev/shm/lich4/nodectl/recovery/<pool>/recovery_node_online [debug]
 *
 * - /dev/shm/lich4/nodectl/recovery/<pool>/disk_switch [x]
 * - /dev/shm/lich4/nodectl/recovery/<pool>/commit_max
 *
 * - /dev/shm/lich4/nodectl/recovery/<pool>/info [out]
 * - /dev/shm/lich4/nodectl/recovery/<pool>/disk_info [out]
 *
 * 恢复过程：按存储池恢复，对于每一个存储池：
 * - 扫描所有属于该存储池的metadata，include pool, volume and snapshot，同时跳过处于删除状态的资源
 * - 扫描结果存入一临时文件
 * - mmap临时文件, 多线程处理每一个chunk
 *
 * 每一个chunk，有几种情况需要恢复：
 * - 副本不一致 (md_chunk_check)
 * - 副本不足 (md_chunk_move)，少变多
 * - 副本过剩（多变少，目前不支持）
 *
 * 处理的故障类型：
 * - 拔盘 (标记chkinfo->reploc[i].status为__S_CHECK)
 * - 断节点
 * - 网络不可达
 *
 * 需要感知的主要变化有：
 * - 磁盘
 * - 集群节点变化
 * - 存储池的增加和删除
 * - 网络状态
 * - 卷的添加和删除
 * - 快照的添加和删除
 * - 控制器切换 (EREMCHG)
 * - 副本数变化
 *
 * 触发方式有几种：
 * - 节点网络断开
 * - 超时设定 - RECOVERY_INTERVAL_DEFAULT_VAL
 * - lich health scan
 *
 * @note 确保每个pool的扫描/恢复过程，是单实例的
 * @note 每个节点上的恢复线程，只处理控制器是本节点的数据块，如果发生controller切换，直接跳过
 * @note 每个chunk需要两个parent信息(vol, pool类chunk，其它类型chunk都等于所在资源的控制器fileid）
 *
 * @todo 优化内存使用量 (rec_t)
 * @todo 怎么才能更快地感知到各种故障和变化，并启用恢复过程？
 * @todo 创建卷失败，会导致metadata表里存在垃圾卷，如何识别？
 * @todo 恢复过程，是否需要感知故障域规则？
 *
 * @todo chunk fail
 * @todo check vfm
 *
 * @changelog zhaoercheng 时间：2017/12/9 解决断节点，节点上线不准确的问题，扫描过程失败退出的问题
 *
 * @see diskmd_recovery.c
 */

static inline const char *_recovery_status2str(int status)
{
    if (status == __RECOVERY_RUNNING__)
        return "running";
    else if (status == __RECOVERY_WAITING__)
        return "waiting";
    else if (status == __RECOVERY_SCANNING__)
        return "scanning";
    else
        return "suspend";
}

typedef struct __recovery_module_t {
        sy_rwlock_t lock;
        int running_pool_count;
} recovery_module_t;

static recovery_module_t __module__;

hashtable_t recovery_tab;
int is_recovery_started = 0;

static void *recovery_scan_thread_func(void *arg);

static int __recovery_pool_destroy(recovery_pool_t *pool);

// pool hash
static uint32_t __recovery_key(const void *i)
{
        return hash_str((char *)i);
}

static int __recovery_cmp(const void *v1, const void *v2)
{
        const recovery_pool_t *pool = v1;
        const char *name = v2;

        return strcmp(pool->name, name);
}

static int recovery_node_count_change(recovery_pool_t *pool)
{
        int ret=0, node_num=0;

        ret = etcd_cluster_node_count(&node_num);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (node_num != pool->node_number) {
                DWARN("pool %s node:%d change to %d\n",
                      pool->name,
                      pool->node_number,
                      node_num);
                ret = EAGAIN;
                goto err_ret;
        }

        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param chkinfo
 * @param repnum dest repnum
 * @return
 */
static int __recovery_need_check_replica(const chkinfo_t *chkinfo, int repnum)
{
        int need;

        need = 0;
        if (chkinfo->repnum == 1) {
                if (!cluster_is_solomode()) {
                        need = 1;
                }
        } else if (chkinfo->repnum < repnum) {
                need = 1;
        }

        return need;
}

/**
 * 每个chunk的parent都属于该资源本身。具体来说，
 * - pool:    self (与db记录不同）
 * - subpool：所在pool的fileid
 * - vol:     self (与db记录不同）
 * - subvol： 所在volume的fileid
 * - raw：    所在volume的fileid
 *
 * @param _arg
 * @param _parent pool/volume chkid
 * @param _ent chkinfo
 */
STATIC void __recovery_chunk_unintact(void *_ent, void *_parent, void *_chkinfo, void *_unintact)
{
        int ret, i, clean, is_top_chunk;
        recovery_pool_t *pool = _ent;
        scan_mq_t *smq = &pool->smq;
        const chkid_t *parent = _parent;
        chkinfo_t *chkinfo = _chkinfo;
        const reploc_t *reploc;
        rec_t rec;
        int repnum;         // target repnum
        addtion_t addtion;

        DBUG("pool %s volume "CHKID_FORMAT" chunk "CHKID_FORMAT"\n", pool->name, CHKID_ARG(parent), CHKID_ARG(&chkinfo->id));

        ret = smq->get_addtion(smq, parent, &addtion);
        if (unlikely(ret)) {
                // UNIMPLEMENTED(__DUMP__);
                DWARN("pool %s chunk "CHKID_FORMAT " ret %d\n", pool->name, CHKID_ARG(parent), ret);
                return;
        }

#if ENABLE_EC
        if (EC_ISEC(&addtion.ec) && eclog_chunk_islog(&chkinfo->id, &addtion.ec)) {
                repnum = addtion.repnum_eclog;
        } else if(chkinfo->id.type == __RAW_CHUNK__) {
                repnum = addtion.repnum_usr;
        } else {
                repnum = addtion.repnum_sys;
        }
#else
        if(chkinfo->id.type == __RAW_CHUNK__) {
                repnum = addtion.repnum_usr;
        } else {
                repnum = addtion.repnum_sys;
        }
#endif

        ANALYSIS_BEGIN(0);

        // 不属于一个pool或volume
        if (parent->id != chkinfo->id.id) {
                return;
        }

        // chunk type: pool or vol
        is_top_chunk = chkid_cmp(parent, &chkinfo->id) == 0 ? 1 : 0;

        if (_unintact) {
                clean = chkinfo->repnum - *(int *)_unintact;
        } else {
                YASSERT(chkinfo->id.type != __RAW_CHUNK__);

                clean = 0;
                for (i = 0; i < chkinfo->repnum; i++) {
                        reploc = &chkinfo->diskid[i];
                        if (reploc->status) {
                                continue;
                        } else {
                                // TODO
                                ret = network_connect1(&reploc->id);
                                if (unlikely(ret)) {
                                        /*?需要每次都network_connect1吗？*/
                                        smq->offline++;
                                        continue;
                                }
                                clean++;
                        }
                }
        }

        ANALYSIS_END(0, 1000, "chunk_check1");

        ANALYSIS_RESET(0);
        if (clean == 0) {
                DWARN("pool %s chunk "CHKID_FORMAT" lost\n", pool->name, CHKID_ARG(&chkinfo->id));
                CHKINFO_DUMP(chkinfo, D_WARNING);
                smq->lost++;
        } else if ((clean != chkinfo->repnum)
                   || __recovery_need_check_replica(chkinfo, repnum)) {
                // 1. 拔盘，造成clean != chkinfo->repnum
                // 2. 写入m个副本，m < n，n为目标副本数

                // TODO rule 1
                // pool或volume上的所有中间和叶子chunk，都以chunk0为父
                // chunk0以其所在pool为父
                // 主要是为了定位其所在控制器
                if (is_top_chunk) {
                        YASSERT(chkid_cmp(parent, &addtion.parent));
                        rec.parent = addtion.parent;
                } else {
                        rec.parent = *parent;
                }

                rec.repnum = repnum;

                rec.chkid = chkinfo->id;
                memcpy(rec.chkinfo_buf, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

                DBUG("add record: pool %s chunk "CHKID_FORMAT" parent %s repnum %u %u %u\n", pool->name,
                      CHKID_ARG(&chkinfo->id),
                      id2str(&rec.parent),
                      rec.repnum,
                      chkinfo->repnum,
                      clean);

                ret = smq->write(smq, parent, &rec, sizeof(rec));
                if (unlikely(ret)) {
                        // UNIMPLEMENTED(__DUMP__);
                        DWARN("pool %s chunk "CHKID_FORMAT " ret %d\n", pool->name, CHKID_ARG(parent), ret);
                }

        }

        ANALYSIS_END(0, 1000, "chunk_check2");
}

STATIC void __recovery_chunk_check(void *_ent, void *_parent, void *_chkinfo)
{
        __recovery_chunk_unintact(_ent, _parent, _chkinfo, NULL);
}

static int recovery_pool_scan_vol_is_deleting(chkid_t * _chkid)
{
        int ret, deleted = 0;
        chkid_t *chkid = _chkid;

        if (is_volume(chkid)) {
                deleted = 0;
                ret = vol_is_deleting(chkid, &deleted);
                if (likely(ret == 0)) {
                        if (deleted) {
                                DWARN("check "CHKID_FORMAT" is deleting\n", CHKID_ARG(chkid));
                                GOTO(err_ret, ret);
                        }
                } else {
                        DWARN("check "CHKID_FORMAT" ret %d\n", CHKID_ARG(chkid), ret);
                }
        }

        return 0;
err_ret:
        return EAGAIN;
}

static int __entry_exist(const chkid_t *chkid, int *exist)
{
        int ret;
        nid_t nid;

        ret = locator_rpc_lookup(chkid, &nid, 1);
        if (ret) {
                if (ret == ENOENT) { /*被删除则退出扫描*/
#if ENABLE_REPLICA_GC
                        ret = replica_srv_unlink(chkid, -1);
                        if(ret) {
                                DWARN("check "CHKID_FORMAT" delete fail, ret %d\n",
                                      CHKID_ARG(chkid), ret);
                        } else {
                                DWARN("check "CHKID_FORMAT" delete success\n",
                                      CHKID_ARG(chkid));
                        }
#endif
                        *exist = 0;
                } else
                        GOTO(err_ret, ret);
        }

        *exist = 1;
        
        return 0;
err_ret:
        return ret;
}

static int __pool_scan_meta(void *_arg, void *_chkid, void *_pool_name)
{
        int ret = 0, retry = 0, exist;
        recovery_pool_t *pool = _arg;
        chkid_t *chkid = _chkid;
        addtion_t addtion;
        fileinfo_t fileinfo;
        char name[MAX_NAME_LEN];
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;

        (void) _pool_name;

        // TODO 因为addtion是pool级共享数据，同一pool的各控制器扫描过程不能并发执行
        // addtion.parent等同于db记录的parent

        if (chkid->type == __POOL_CHUNK__ || chkid->type == __VOLUME_CHUNK__) {
                ret = replica_srv_getparent(chkid, &addtion.parent, name);
                if (ret) {
                        DWARN("pool %s chkid "CHKID_FORMAT" fail, ret %d\n",
                              pool->name, CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }

                if (strcmp(name, pool->name) != 0) {
                        return 0;
                }

                chkinfo = (void *)_chkinfo;

retry:
                ret = md_chunk_getinfo(pool->name, NULL, chkid, chkinfo, NULL);
                if (unlikely(ret)) {
                        if (ret == ETIME) {
                                // TODO Timer expired
                                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(chkid), ret);
                                USLEEP_RETRY(out, ret, retry, retry, 10, (1000 * 1000));
                        } else {
                                ret = __entry_exist(chkid, &exist);
                                if (ret) {
                                        GOTO(err_ret, ret);
                                }

                                if (exist) {
                                        ret = EAGAIN;
                                        DWARN(""CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                                        USLEEP_RETRY(out, ret, retry, retry, 10, (1000 * 1000));
                                } else {
                                        goto out;
                                }
                        }
                }

                if (nid_cmp(net_getnid(), &chkinfo->diskid[0].id) != 0) {
                        return 0;
                }

                ret = recovery_pool_scan_vol_is_deleting(chkid);
                if (unlikely(ret)) {
                        ret = ENOENT;
                        DWARN("pool %s chkid "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }

                DINFO("pool %s chkid "CHKID_FORMAT" host %s\n",
                      pool->name, CHKID_ARG(chkid), network_rname(&chkinfo->diskid[0].id));

                ret = stor_ctl_getattr(chkid, &fileinfo);
                if (unlikely(ret)) {
                        DWARN("pool %s chkid "CHKID_FORMAT" fail, ret %d\n",
                              pool->name, CHKID_ARG(chkid), ret);

                        if (ret == EREMCHG) {
                                return 0;
                        } else if (ret == ENOENT) {
                                ret = __entry_exist(chkid, &exist);
                                if (ret) {
                                        GOTO(err_ret, ret);
                                }

                                if (exist) {
                                        ret = EAGAIN;
                                        DWARN(""CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                                        USLEEP_RETRY(out, ret, retry, retry, 10, (1000 * 1000));
                                } else {
                                        goto out;
                                }
                        } else if (ret == ESTALE) {
                                GOTO(err_ret, ret);
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                addtion.ec = fileinfo.ec;

                addtion.repnum_usr = fileinfo.repnum_usr;
                addtion.repnum_eclog = addtion.ec.m - addtion.ec.k + 1;

                if (fileinfo.repnum_sys == 0) {
                        addtion.repnum_sys = gloconf.metadata_replica;
                } else {
                        addtion.repnum_sys = fileinfo.repnum_sys;
                }

                DINFO("pool %s chkid "CHKID_FORMAT" repnum_usr:%d repnum_sys:%d repnum_eclog:%d\n",
                      pool->name,
                      CHKID_ARG(chkid),
                      addtion.repnum_usr,
                      addtion.repnum_sys,
                      addtion.repnum_eclog);

                ret = pool->smq.push(&pool->smq, pool, chkid, &addtion);
                if (unlikely(ret)) {
                        DWARN("pool %s chkid "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }
        } else {
                DBUG("skip "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                return 0;
        }

out:
        return 0;
err_ret:
        DWARN("pool %s chkid "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
        return ret;
}

static void __pool_set_rescan(recovery_pool_t *pool, int rescan, const char *reason, int errcode)
{
        int ret;
        char prefix[MAX_PATH_LEN];
        char key[MAX_NAME_LEN];

        DINFO("pool %s rescan %d reason %s errcode %d\n", pool->name, rescan, reason, errcode);

        pool->need_rescan = rescan;

        sprintf(prefix, "%s/%s", ETCD_POOLSCAN, pool->name);
        sprintf(key, "%d", ng.local_nid.id);

        if (rescan) {
                ret = etcd_set_with_ttl(prefix, key, "1", 0);
                if (unlikely(ret)) {
                        if (ret != EEXIST) {
                                DWARN("set path %s key %s ret %d\n", prefix, key, ret);
                        }
                }
        } else {
                ret = etcd_del(prefix, key);
                if (unlikely(ret)) {
                        if (ret != ENOENT) {
                                DWARN("del path %s key %s ret %d\n", prefix, key, ret);
                        }
                }
        }
}

/**
 * If an error occurs, must retry until it success
 * Modified by hcc on 20180525
 * Lookup bug#11320 for further details
 *
 * @todo bug#11391
 *
 */
static void __pool_scan_meta_retry(void *_arg, void *_chkid, void *_pool_name)
{
        int ret, retry = 0, done = FALSE;
        recovery_pool_t *pool = _arg;

        DBUG(CHKID_FORMAT", pool %s\n", CHKID_ARG((chkid_t *)_chkid), (char *)_pool_name);
        
        while (!done && retry <= 60) {
                ret = __pool_scan_meta(_arg, _chkid, _pool_name);
                if (unlikely(ret)) {
                        if (ret == ENOENT) { /*被删除则退出扫描*/
                                done = TRUE;
                                break;
                        } else {
                                retry ++;
                                DWARN("pool %s chkid "CHKID_FORMAT" ret %d retry %d\n",
                                      pool->name, CHKID_ARG((chkid_t *)_chkid), ret, retry);

                                sleep(1);
                                continue;
                        }
                } else {
                        done = TRUE;
                }
        }

        if (!done) {
                __pool_set_rescan(pool, TRUE, "__pool_scan_meta_retry", 0);
        }
}

static int __recovery_vfm_cleanup(void *arg1, void *arg2)
{
        int ret, i, retry = 0;
        fileinfo_t fileinfo;
        uint32_t max, chknum;
        chkid_t tid;
        recovery_mq_item_t *item = arg2;        
        const volid_t *volid = &item->chkid;

        YASSERT(arg2);

        if (volid->type != __VOLUME_CHUNK__) {
                goto out;
        }

        DINFO("cleanup "CHKID_FORMAT" begin\n", CHKID_ARG(volid));
        
        ret = md_getattr(volid, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chknum = size2chknum(fileinfo.size, NULL);
        max = chknum2tabnum(chknum);

        for (i = 0; i < max; i++) {
                tid = *volid;
                tid.idx = i;
                tid.type = __VOLUME_SUB_CHUNK__;
                retry = 0;

        retry:
                ret = stor_ctl_vfm_cleanup(volid, &tid);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                DBUG("check "CHKID_FORMAT"\n", CHKID_ARG(&tid));
                                continue;
                        } else if (ret == EREMCHG) {
                                GOTO(err_ret, ret);
                        } else if (ret == EBUSY) {
                                DWARN(""CHKID_FORMAT" fail\n", CHKID_ARG(&tid));
                                USLEEP_RETRY(err_ret, ret, retry, retry, 10, (1000 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }

                DBUG(""CHKID_FORMAT" intact\n", CHKID_ARG(&tid));
        }

        DINFO("cleanup "CHKID_FORMAT" end\n", CHKID_ARG(volid));
out:
        return 0;
err_ret:
        DWARN("cleanup "CHKID_FORMAT" fail\n", CHKID_ARG(volid));
        return ret;
}

static int recovery_pool_scan_chkid(void *_ent, void *_arg)
{
        int ret = 0, retry = 0;
        recovery_pool_t *pool = _ent;
        chkid_t *chkid = _arg;

        DINFO("pool %s begin chunk "CHKID_FORMAT"\n", pool->name , CHKID_ARG(chkid));

        YASSERT(chkid->type == __POOL_CHUNK__ || chkid->type == __VOLUME_CHUNK__);

retry:
        if (chkid->type == __POOL_CHUNK__) {
                ret = pool_ctl_chunk_iterator(chkid, __recovery_chunk_check, pool);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                //nothing todo
                        } else if (ret == ENODATA) {
                                DERROR("pool %s check "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        } else if (ret == ENOENT) {
                                DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        } else {
                                ret = _errno(ret);
                                if (ret == EAGAIN)  {
                                        DWARN("pool %s check "CHKID_FORMAT" ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                                } else if (ret == ENOENT) {
                                        DERROR("pool %s "CHKID_FORMAT" lost reference\n", pool->name, CHKID_ARG(chkid));
                                        SWARN(0, "%s, "CHKID_FORMAT" lost reference\n",
                                              M_DATA_CHUNK_WARN, CHKID_ARG(chkid));
                                } else
                                        DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n",pool->name, CHKID_ARG(chkid), ret);
                        }
                }

        } else if (chkid->type == __VOLUME_CHUNK__) {
                ret = volume_ctl_chunk_unintact(chkid, __recovery_chunk_unintact, pool, 0);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                //nothing todo
                        } else if (ret == ENODATA) {
                                DERROR("pool %s check "CHKID_FORMAT" fail, ret %d\n",pool->name, CHKID_ARG(chkid), ret);
                        } else if (ret == ECANCELED) {
                                DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n",pool->name, CHKID_ARG(chkid), ret);
                                goto err_ret;
                        } else if (ret == ENOENT) {
                                DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        } else {
                                ret = _errno(ret);
                                if (ret == EAGAIN)  {
                                        DWARN("pool %s check "CHKID_FORMAT" ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                                } else if (ret == ENOENT) {
                                        DERROR("pool %s "CHKID_FORMAT" lost reference\n", pool->name, CHKID_ARG(chkid));
                                        SWARN(0, "%s, "CHKID_FORMAT" lost reference\n",
                                              M_DATA_CHUNK_WARN, CHKID_ARG(chkid));
                                } else
                                        DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n",pool->name, CHKID_ARG(chkid), ret);
                        }
                }
        }


        // 可以基于rec_t计算内存使用量
        DINFO("end chunk "CHKID_FORMAT" pools:%d pool %s recovery:%ju lost:%ju offline:%ju rec_t:%lu ret %d\n",
              CHKID_ARG(chkid),
              recovery_tab->num_of_entries,
              pool->name,
              pool->smq.need_recovery,
              pool->smq.lost,
              pool->smq.offline,
              sizeof(rec_t),
              ret);

        return 0;
err_ret:
        DWARN("pool %s check "CHKID_FORMAT" fail, ret %d\n", pool->name, CHKID_ARG(chkid), ret);
        return ret;
}

void recovery_dump(recovery_pool_t *pool, int new_status)
{
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_tp_t *rtp = &pool->rtp;
        char value[MAX_BUF_LEN];
        char path[MAX_NAME_LEN];

        if (new_status != -1) {
                DINFO("pool %s recovery_status: %s -> %s\n",
                      pool->name,
                      _recovery_status2str(pool->status),
                      _recovery_status2str(new_status));

                pool->status = new_status;
        }

        if ((pool->status == __RECOVERY_SCANNING__) || (pool->status == __RECOVERY_WAITING__)) {
                pool->lastscan = gettime();
                rmq->recovery = 0;
                smq->lost = 0;
                smq->offline = 0;
                rtp->check = 0;
                rtp->success = 0;
                rtp->fail = 0;
                rtp->speed = 0;
        }

        snprintf(value, MAX_BUF_LEN,
                 "status:%s\n"
                 "recovery:%ju\n"
                 "lost:%ju\n"
                 "offline:%ju\n"
                 "success_total:%ju\n"
                 "check:%ju\n"
                 "success:%ju\n"
                 "fail:%ju\n"
                 "speed:%ju\n"
                 "lastscan:%u\n",
                 _recovery_status2str(pool->status),
                 rmq->recovery,
                 smq->lost,
                 smq->offline,
                 rtp->success_total,
                 rtp->check,
                 rtp->success,
                 rtp->fail,
                 rtp->speed,
                 (int)pool->lastscan);

        memset (path, 0, MAX_NAME_LEN);
        sprintf(path, RECOVERY"%s"RECOVERY_INFO, pool->name);

        nodectl_set(path, value);
}

void recovery_internals_dump(recovery_pool_t *pool)
{
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        char path[MAX_NAME_LEN];
        char value[MAX_BUF_LEN];

        struct list_head *pos, *n;
        recovery_mq_item_t *item;
        char buf[MAX_NAME_LEN];

        static uint64_t __count = 0;

        if (__count % 3 == 0) {
                memset (path, 0, MAX_NAME_LEN);
                sprintf(path, "%s%s/internals", RECOVERY, pool->name);

                string_buffer sbuf;
                string_buffer_init(&sbuf);

                list_for_each_safe(pos, n, &smq->chkid_list.list) {
                        item = list_entry(pos, recovery_mq_item_t, hook);
                        memset(buf, 0x0, MAX_NAME_LEN);
                        snprintf(buf, MAX_NAME_LEN, "%s", id2str(&item->chkid));
                        sbuf.push(&sbuf, buf);
                }

                snprintf(value, MAX_BUF_LEN,
                         "status:%s\n"
                         "scan_chkid_list:%d:%s\n"
                         "r_file_list:%d\n",
                         _recovery_status2str(pool->status),
                         smq->chkid_list.count,
                         sbuf.buf,
                         rmq->file_list.count);

                nodectl_set(path, value);
        }
}

void recovery_internals_unlink(recovery_pool_t *pool)
{
        char path[MAX_NAME_LEN];

        memset (path, 0, MAX_NAME_LEN);
        sprintf(path, "%s%s/internals", RECOVERY, pool->name);

        nodectl_unlink(path);
}

void recovery_disk_info_clean(recovery_pool_t *pool, int new_status)
{
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_tp_t *rtp = &pool->rtp;
        char value[MAX_BUF_LEN];
        char path[MAX_NAME_LEN];

        if (new_status == __RECOVERY_WAITING__) {
                pool->lastscan = gettime();
                rmq->recovery = 0;
                smq->lost = 0;
                smq->offline = 0;
                rtp->check = 0;
                rtp->success = 0;
                rtp->fail = 0;
                rtp->speed = 0;
        }

        snprintf(value, MAX_NAME_LEN,
                 "status:%s\n"
                 "recovery:%ju\n"
                 "lost:%ju\n"
                 "offline:%ju\n"
                 "success_total:%ju\n"
                 "check:%ju\n"
                 "success:%ju\n"
                 "fail:%ju\n"
                 "speed:%ju\n"
                 "lastscan:%u\n",
                 _recovery_status2str(new_status),
                 rmq->recovery,
                 smq->lost,
                 smq->offline,
                 rtp->success_total,
                 rtp->check,
                 rtp->success,
                 rtp->fail,
                 rtp->speed,
                 (int)pool->lastscan);

        memset (path, 0, MAX_NAME_LEN);
        sprintf(path, RECOVERY"%s"RECOVERY_DISK_INFO, pool->name);

        nodectl_set(path, value);
}

static int __recover_get_new_diskid_count(const chkinfo_t *chkinfo, int target_repnum)
{
        int ret, new_diskid_count;
        int node_count;

        // TODO global
        node_count = 0;
        ret = cluster_countnode(&node_count);
        if (ret) {
                DERROR("cluster count node fail. ret:%d, errmsg:%s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        new_diskid_count = max(min(target_repnum, node_count) - chkinfo->repnum, 0);

        return new_diskid_count;
err_ret:
        return 0;
}

static int __recovery_replica(const char *pool, const nid_t *nid, const fileid_t *srv,
                              const chkinfo_t *chkinfo, int new_count)
{
        int  ret, i, disk_count, flag;
        diskid_t disks[LICH_REPLICA_MAX] = {}, new_disks[LICH_REPLICA_MAX] = {};

        disk_count = chkinfo->repnum;
        for (i = 0; i < chkinfo->repnum; i++) {
                disks[i] = chkinfo->diskid[i].id ;
        }

        flag = __NEWDISK_BALANCE__;
        ret = dispatch_newdisk(new_disks, &new_count, new_count, pool, disks, disk_count, flag);
        if (ret) {
                DBUG("dispatch new disk fail, ret:%d, "CHKID_FORMAT"\n",
                       ret, CHKID_ARG(&chkinfo->id));
                GOTO(err_ret, ret);
        }

        for (i = 0; i < new_count; i++) {
                disks[disk_count++] = new_disks[i];
        }

        YASSERT(disk_count <= LICH_REPLICA_MAX);
        ret = md_chunk_move(nid, srv, &chkinfo->id, disks, disk_count);
        if (ret) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __recovery_check2clean(const rec_t *rec)
{
        int i, ret, clean;
        const reploc_t *reploc;
        chkinfo_t *chkinfo = (void *)rec->chkinfo_buf;

        clean = 0;
        for (i = 0; i < chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];
                if (reploc->status) {
                        continue;
                }
                clean++;
        }

        if (clean == 0) {
                // 若无clean副本，则设置在线的check副本为clean状态
                for (i = 0; i < chkinfo->repnum; i++) {
                        reploc = &chkinfo->diskid[i];
                        if ((reploc->status == __S_CHECK) && replica_diskonline(chkinfo, i)) {
                                ret = md_chunk_set(&rec->parent, &chkinfo->id, &reploc->id, __S_CLEAN);
                                DWARN("set "CHKID_FORMAT" @ "NID_FORMAT" from check to clean, ret: %d\n",
                                      CHKID_ARG(&chkinfo->id), NID_ARG(&reploc->id), ret);
                        }
                }
        }

        return 0;
}

/**
 * - 如无clean副本，采用check副本
 * - 检查chunk所在controller是否是本节点
 * - 处理副本数少于目标副本数的情况
 * - 检查可写节点数是否足够（@todo rack?）
 * - 处理副本数不一致的情况
 *
 * @param pool
 * @param rec
 * @return
 */
static int __recovery_chunk_recover(recovery_pool_t *pool, const rec_t *rec, int *oflags)
{
        int ret, new_diskid_count = 0, count, retry = 0;
        nid_t nid, parentnid;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];
        chkid_t chkid, srv;

        DBUG("pool %s chunk "CHKID_FORMAT" begin\n", pool->name, CHKID_ARG(&rec->chkid));

        ret = __recovery_check2clean(rec);
        if (ret)
                GOTO(err_ret, ret);

        chkinfo = (void *)buf;
        chkid = rec->chkid;

        ANALYSIS_BEGIN(0);

        // TODO 为何需要再次获取chkinfo？
        ret = md_chunk_getinfo(pool->name, &rec->parent, &chkid, chkinfo, &parentnid);
        if (unlikely(ret)) {
                DINFO("chunk "CHKID_FORMAT" fail ret:%d\n", CHKID_ARG(&chkid), ret);
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "chunk_recover_0");

        ANALYSIS_BEGIN(1);

        // TODO rule 2
        if (rec->chkid.type == __POOL_CHUNK__ || rec->chkid.type == __VOLUME_CHUNK__) {
                nid = chkinfo->diskid[0].id;
                srv = chkinfo->id;
        } else {
                nid = parentnid;
                srv = rec->parent;
        }

        if (nid_cmp(net_getnid(), &nid) != 0) {
                ret = EREMCHG;
                DWARN("chunk "CHKID_FORMAT" moved ret:%d \n", CHKID_ARG(&chkinfo->id), ret);
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(1, IO_WARN, "chunk_recover_1");

        if (__recovery_need_check_replica(chkinfo, rec->repnum)) {
                DBUG("chunk "CHKID_FORMAT" need change replica %u --> %u\n",
                      CHKID_ARG(&chkinfo->id), chkinfo->repnum, rec->repnum);

                ANALYSIS_BEGIN(2);

                new_diskid_count = __recover_get_new_diskid_count(chkinfo, rec->repnum);
                if (new_diskid_count > 0) {
                        ret = __recovery_replica(pool->name, &nid, &srv, chkinfo, new_diskid_count);
                        if (ret)
                                GOTO(err_ret, ret);

                        DBUG("chunk "CHKID_FORMAT" recover success...\n",
                              CHKID_ARG(&chkinfo->id));
                }

                ANALYSIS_QUEUE(2, IO_WARN, "chunk_recover_2");
        }

        ANALYSIS_BEGIN(3);

        ret = cluster_countnode_writeable(&count);
        if (ret)
                GOTO(err_ret, ret);

        if (chkinfo->repnum > count) {
                ret = ENOSPC;
                DWARN("chunk repnum：%d "CHKID_FORMAT" node count: %d\n",
                              chkinfo->repnum, CHKID_ARG(&chkid), count);
                GOTO(err_ret, ret);
        }

retry:
        ret = md_chunk_check(&nid, &srv, &chkid, -1, 0, 0, oflags);
        if (unlikely(ret)) {
                ret = _errno(ret);
                YASSERT(ret != EINVAL);

                DBUG("chunk "CHKID_FORMAT" fail, ret: %d\n", CHKID_ARG(&chkid), ret);

                if (ret == EBUSY || ret == EAGAIN) {
                        // TODO
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (100 * 1000));
                } else {
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_QUEUE(3, IO_WARN, "chunk_recover_3");

        DBUG("pool %s chunk "CHKID_FORMAT" end\n", pool->name, CHKID_ARG(&rec->chkid));
        return 0;
err_ret:
        DWARN("pool %s chunk "CHKID_FORMAT" fail\n", pool->name, CHKID_ARG(&rec->chkid));
        return ret;
}

static void *recovery_scan_thread_func(void *arg)
{
        int ret, len;
        recovery_seg_t *seg = arg;
        recovery_pool_t *pool = seg->pool;
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        chkid_t chkid;

        seg->status = __R_TH_RUN__;

        while (1) {
                if (seg->status == __R_TH_STOP__)
                        break;

                // TODO too many open files
                len = rmq->get_length(rmq);
                if (len > RECOVERY_RMQ_MAX_FD_COUNT) {
                        DWARN("too many open files: %d\n", len);
                        usleep(1000 * 1000);
                        continue;
                }

                ret = smq->pop(smq, &chkid);
                if (unlikely(ret)) {
                        YASSERT(ret == ENOENT);
                        DINFO("pool %s scan controller done\n", pool->name);
                        break;
                }

                DBUG("pool %s chkid "CHKID_FORMAT" thread %d\n",
                     pool->name, CHKID_ARG(&chkid), seg->idx);

                ANALYSIS_BEGIN(0);

                /**
                 * @todo bug #11411
                 */
                ret = recovery_pool_scan_chkid(pool, &chkid);

                ANALYSIS_END(0, 1000*1000, "__pool_scan_chkid");

                smq->done(smq, &chkid, ret);

                if (unlikely(ret)) {
                        DWARN("chunk "CHKID_FORMAT" ret %d\n", CHKID_ARG(&chkid), ret);

                        if (EAGAIN == ret || ECANCELED == ret) {
                                __pool_set_rescan(pool, TRUE, "pool_scan_chkid", ret);
                        }
                }
        }

        DBUG("pool %s thread %d done\n", pool->name, seg->idx);

        seg->status = __R_TH_STOPPED__;
        return NULL;
}

static void *__recovery_thread_func(void *_arg)
{
        int ret, is_ready, oflags = 0;
        recovery_seg_t *seg = _arg;
        recovery_pool_t *pool = seg->pool;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_tp_t *stp = &pool->stp;
        recovery_tp_t *rtp = &pool->rtp;
        rec_t rec;

        seg->status = __R_TH_RUN__;

        vec_rec_t v;
        vec_init(&v);

        while (1) {
                if (seg->status == __R_TH_STOP__)
                        break;

                token_bucket_consume(&pool->token_bucket, 1, &is_ready, NULL);
                if (!is_ready) {
                        usleep(100);
                        continue;
                }

                if (v.length == 0) {
                        ret = rmq->pop2(rmq, RECOVERY_RMQ_POP_NUMBER, &v);
                        if (unlikely(ret)) {
                                YASSERT(ret == ENOENT);
                                if (stp->is_stopped(stp, NULL)) {
                                        break;
                                } else {
                                        token_bucket_inc(&pool->token_bucket, 1);
                                        usleep(100*1000);
                                        DBUG("ret %d\n", ret);
                                        continue;
                                }
                        }
                }

                YASSERT(v.length > 0);
                rec = vec_pop(&v);

                ANALYSIS_BEGIN(0);

                oflags = 0;
                ret = __recovery_chunk_recover(pool, &rec, &oflags);
                // DINFO("chunk "CHKID_FORMAT" ret %d oflags %d\n", CHKID_ARG(&rec.chkid), ret, oflags);
                if (rec.chkid.type == __RAW_CHUNK__ && oflags == 0) {
                        token_bucket_inc(&pool->token_bucket, 1);
                        rtp->result(rtp, __T_RESULT_CHECK__, 1);
                } else {
                        if (ret == 0 || ret == EREMCHG) {
                                rtp->result(rtp, __T_RESULT_SUCCESS__, 1);
                        } else {
                                token_bucket_inc(&pool->token_bucket, 1);
                                rtp->result(rtp, __T_RESULT_FAIL, 1);
                        }
                }

                ANALYSIS_END(0, 1000 * 1000, "__recovery_chunk_recover");

        }

        vec_deinit(&v);

        DBUG("pool %s thread %d done\n", pool->name, seg->idx);

        seg->status = __R_TH_STOPPED__;
        return NULL;
}

static int recovery_pool_join(recovery_pool_t *pool)
{
        int ret=0, fill_rate=0, new_thread=0;
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_tp_t *stp = &pool->stp;
        recovery_tp_t *rtp = &pool->rtp;
        int number_count=0;
        uint64_t temp_success=0;

        DINFO("pool %s\n", pool->name);

        while(1) {
                DBUG("pool %s stp %d %d rtp %d %d smq %d rmq %d recovery %jd (%jd + %jd + %jd)\n",
                     pool->name,
                     stp->thread_num,
                     stp->started,
                     rtp->thread_num,
                     rtp->started,
                     smq->get_length(smq),
                     rmq->get_length(rmq),
                     rmq->recovery,
                     rtp->check,
                     rtp->success,
                     rtp->fail);

                if (!rtp->started) {
                        if (rmq->recovery == 0) {
                                if (stp->is_stopped(stp, NULL)) {
                                        // double check
                                        if (rmq->recovery == 0) {
                                                DINFO("pool %s done\n", pool->name);
                                                break;
                                        }
                                }
                        } else {
                                // notify balance task to stop
                                recovery_global_flag_set();

                                recovery_dump(pool, __RECOVERY_RUNNING__);

                                int thread = load_thread(pool);
                                rtp->start(rtp, pool, thread, __recovery_thread_func);
                        }

                        sleep(1);
                        continue;
                }

                // TODO speed
                temp_success = rtp->success;
                sleep(1);
                rtp->speed = rtp->success - temp_success;

                recovery_dump(pool, -1);
                //recovery_internals_dump(pool);        /*core dump when very many volumes.*/

                if((pool->rtp.success_total % 1000) == 0) {
                        set_recovery_total(pool->name, pool->rtp.success_total, FALSE);
                }

                YASSERT(rtp->check + rtp->success + rtp->fail <= rmq->recovery);

                if (rtp->is_stopped(rtp, NULL)) {
                        DINFO("pool %s recovery %ju check %ju success %ju fail %ju\n",
                              pool->name,
                              rmq->recovery,
                              rtp->check,
                              rtp->success,
                              rtp->fail);

                        if (rtp->check + rtp->success == rmq->recovery) {
                                break;
                        } else {
                                // TODO retry forever?
                                // rescan开销太大，需要保持降级状态
                                if (rtp->fail != 0) {
                                        DWARN("pool %s fail %ju\n", pool->name, rtp->fail);
                                        usleep(60 * 1000 * 1000);
                                }
                                ret = EAGAIN;
                                GOTO(err_ret, ret);
                        }
                }

                if (pool->status_remove == __RECOVERY_REMOVEING__) {
                        DWARN("pool %s removing\n", pool->name);
                        ret = 0;
                        GOTO(err_ret, ret);
                }

                // TODO new thread
                new_thread = load_thread(pool);
                if (new_thread != rtp->thread_num) {
                        DWARN("pool %s thread %d -> %d\n",
                              pool->name, rtp->thread_num, new_thread);
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }

                // for debug，触发rescan
                if (recovery_node_get(pool)) {
                        DWARN("pool %s node online\n", pool->name);
                        recovery_node_offline_set(pool);
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }

                // 检查节点数是否发生变化
                number_count++;
                if (RECOVERY_NODE_CHANGE_FLAG == number_count) {
                        number_count = 0;
                        ret = recovery_node_count_change(pool);
                        if (unlikely(ret)) {
                                DWARN("pool %s node change ret %d\n", pool->name, ret);
                                ret = EAGAIN;
                                GOTO(err_ret, ret);
                        }
                }

                /**
                 * TODO check deleted or EREMCHG
                 *
                 * @todo 如果在恢复过程中，频繁地有卷/快照被删除，则会反复触发rescan
                 * pool在scanning <-> running状态之间来回切换
                 */
                ret = smq->check_change_volume(smq);
                if (unlikely(ret)) {
                        DWARN("pool %s check_change_volume ret %d\n", pool->name, ret);
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }

                fill_rate = load_fill_rate(pool);
                if (fill_rate <= 0) {
                        DWARN("pool %s fill_rate %d\n", pool->name, fill_rate);
                        usleep(1000 * 1000);
                        continue;
                        // goto out_stop;
                }
        }

        // TODO 务必确保所有vol得到了成功恢复，才能cleanup
        ret = smq->vol_iterator(smq, __recovery_vfm_cleanup, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("pool %s recovery %ju check %ju success %ju fail %ju\n",
              pool->name,
              rmq->recovery,
              rtp->check,
              rtp->success,
              rtp->fail);

        YASSERT(rmq->recovery == smq->need_recovery);
        YASSERT(rmq->recovery == rtp->check + rtp->success + rtp->fail);

        YASSERT(stp->is_stopped(stp, NULL));
        YASSERT(rtp->is_stopped(rtp, NULL));

        stp->stop(stp);
        rtp->stop(rtp);
        return 0;
err_ret:
        DINFO("pool %s recovery %ju check %ju success %ju fail %ju\n",
              pool->name,
              rmq->recovery,
              rtp->check,
              rtp->success,
              rtp->fail);

        stp->stop(stp);
        rtp->stop(rtp);
        return ret;
}

static void __pool_scan_meta_for_vfm(void *_arg, void *_chkid, void *_pool_name)
{
        int ret = 0, count = 0;
        recovery_pool_t *pool = _arg;
        chkid_t *chkid = _chkid;
        chkid_t parent;
        char name[MAX_NAME_LEN];
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;

        (void)_pool_name;

        DBUG(CHKID_FORMAT", pool %s\n", CHKID_ARG(chkid), (char *)_pool_name);
        
        if (chkid->type == __VOLUME_CHUNK__) {
                ret = replica_srv_getparent(chkid, &parent, name);
                if (ret) {
                        DWARN("pool %s chkid "CHKID_FORMAT" ret %d\n", pool->name, CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }

                if (strcmp(name, pool->name) != 0) {
                        return;
                }

                chkinfo = (void *)_chkinfo;
                ret = md_chunk_getinfo(pool->name, NULL, chkid, chkinfo, NULL);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                if (nid_cmp(net_getnid(), &chkinfo->diskid[0].id) != 0) {
                        return;
                }

                ret= recovery_pool_scan_vol_is_deleting(chkid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = md_vfm_stat(chkid, &count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                if (count > 0) {
                        DINFO("pool %s vol "CHKID_FORMAT" host %s vfm %d\n",
                              pool->name, CHKID_ARG(chkid), network_rname(&chkinfo->diskid[0].id), count);
                        pool->vfm_vol_count++;
                } else {
                        DBUG("pool %s vol "CHKID_FORMAT" host %s vfm %d\n",
                             pool->name, CHKID_ARG(chkid), network_rname(&chkinfo->diskid[0].id), count);
                }
        }

        return;
err_ret:
        DWARN("pool %s ret %d\n", pool->name, ret);
}

static int __recovery_check_vfm(recovery_pool_t *pool, int *vfm_vol_count)
{
        (void) pool;

        *vfm_vol_count = 0;

        pool->vfm_vol_count = 0;
        disk_maping->iterator("metadata", __pool_scan_meta_for_vfm, pool);

        *vfm_vol_count = pool->vfm_vol_count;

        DBUG("pool %s vfm %d\n", pool->name, pool->vfm_vol_count);
        return 0;
}

static void __recovery_sleep(void *_ent)
{
        int ret, interval, sleep = 60, count = 0, vfm_vol_count;
        recovery_pool_t *pool = _ent;

        interval = recovery_load_interval();

        if (pool->init_flag) {
                while (1) {
                        ret = _sem_timedwait1(&pool->sem, sleep);
                        if (ret == 0) {
                                break;
                        } else if (ret == ETIMEDOUT) {
                                count += 1;

                                if (count < interval / sleep) {
                                        DBUG("pool %s timeout %d count %d/%d\n",
                                             pool->name, interval, interval / sleep, count);

                                        // TODO check vfm
                                        __recovery_check_vfm(pool, &vfm_vol_count);
                                        if (vfm_vol_count > 0) {
                                                DWARN("pool %s vfm %d\n", pool->name, vfm_vol_count);
                                                break;
                                        }
                                } else {
                                        DINFO("pool %s timeout %d count %d/%d\n",
                                             pool->name, interval, interval / sleep, count);
                                        break;
                                }
                        } else {
                                UNIMPLEMENTED(__DUMP__);
                        }
                }
        } else {
                pool->init_flag = 1;
        }

        DINFO("pool %s init %d status %u interval %u\n",
              pool->name, pool->init_flag, pool->status, interval);
}

static int __pool_scan_and_recover(recovery_pool_t *pool)
{
        int ret;
        scan_mq_t *smq = &pool->smq;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_tp_t *stp = &pool->stp;
        recovery_tp_t *rtp = &pool->rtp;

        if (gloconf.background_recovery == 0) {
                DERROR("recovery disabled\n");
                return 0;
        }
        
        // stage 1: scan meta
        disk_maping->iterator("metadata", __pool_scan_meta_retry, pool);

        int count = smq->get_length(smq);
        if (count == 0) {
                DINFO("pool %s nothing to recover\n", pool->name);
                ret = EINVAL;
                goto err_ret;
        }

        DINFO("pool %s scan meta %d\n",pool->name, count);

        YASSERT(!stp->started);
        YASSERT(!rtp->started);

        int scan_thread = _min(count, RECOVERY_STP_THREAD_NUM);

        // @fork
        stp->start(stp, pool, scan_thread, recovery_scan_thread_func);

        // @join
        ret = recovery_pool_join(pool);
        if (unlikely(ret)) {
                DWARN("pool %s ret:%d\n", pool->name, ret);
                GOTO(err_fd, ret);
        }

        YASSERT(stp->is_stopped(stp, NULL));
        YASSERT(rtp->is_stopped(rtp, NULL));

        smq->deinit(smq);
        rmq->deinit(rmq);
        return 0;
err_fd:
        YASSERT(stp->is_stopped(stp, NULL));
        YASSERT(rtp->is_stopped(rtp, NULL));

        smq->deinit(smq);
        rmq->deinit(rmq);
err_ret:
        return ret;
}

/**
 * 每个扫描周期分两个阶段：
 * - scan（会扫描全pool的数据，有可能需要花费较长时间）
 * - recovery
 *
 * @note 删除pool后，需要能够从该线程退出，并且退出所有工作线程
 *
 * @param _ent
 * @return
 */
static void *__recovery_pool_worker(void *_ent)
{
        int ret, done = FALSE, loop_count = 0;
        recovery_pool_t *pool = _ent;
        struct timeval t1, t2;

        DINFO("enter@ pool %s\n", pool->name);

        while (!done) {
                // 一个恢复周期

                // TODO notify balance task to work
                recovery_global_flag_post();

                // TODO recovery_set_off(pool);

                recovery_dump(pool, __RECOVERY_WAITING__);

                __recovery_sleep(pool);

                recovery_config_dump(pool);

                loop_count++;

                _gettimeofday(&t1, NULL);

                // #bug 11496 generate disk_fill_rate
                common_load_fill_rate(pool->name, NULL, 0);
                common_load_fill_rate(pool->name, NULL, 1);

                while (!done) {
                        DINFO("begin@ pool %s status %d loop %d\n",
                              pool->name, pool->status_remove, loop_count);

                        if (pool->status_remove == __RECOVERY_REMOVEING__) {
                                __recovery_pool_destroy(pool);
                                pool = NULL;
                                done = TRUE;
                                DWARN("break@ pool %s removed\n", pool->name);
                                break;
                        }

                        __pool_set_rescan(pool, FALSE, "pre", 0);

#if 0
                        int fill_rate = load_fill_rate(pool);
                        if (fill_rate <= 0) {
                                DWARN("pool %s fill_rate %d must be greater than zero\n",
                                      pool->name, fill_rate);
                                break;
                        }
#endif

                        ret = etcd_cluster_node_count(&pool->node_number);
                        if (unlikely(ret)) {
                                DWARN("continue@ pool %s ret %d\n", pool->name, ret);
                                usleep(1000 * 1000);
                                continue;
                        }

                        recovery_dump(pool, __RECOVERY_SCANNING__);

                        ret = __pool_scan_and_recover(pool);
                        if (unlikely(ret)) {
                                DBUG("pool %s ret %d\n", pool->name, ret);
                                if (ret == EAGAIN) {
                                        DWARN("continue@ pool %s ret %d\n", pool->name, ret);
                                        usleep(3 * 1000 * 1000);
                                        continue;
                                } else {
                                        DWARN("break@ pool %s ret %d\n", pool->name, ret);
                                }
                        }

                        if (pool->status_remove == __RECOVERY_REMOVEING__) {
                                continue;
                        } else if (pool->need_rescan){
                                DWARN("continue@ pool %s rescan\n", pool->name);
                                usleep(30 * 1000 * 1000);
                                continue;
                        } else {
                                DINFO("break@ pool %s success\n", pool->name);
                                break;
                        }
                }

                if (pool) {
                        set_recovery_total(pool->name, pool->rtp.success_total, FALSE);

                        __pool_set_rescan(pool, FALSE, "post", 0);

                        _gettimeofday(&t2, NULL);
                        int64_t used = _time_used(&t1, &t2) / 1000000;

                        DINFO("end@ pool %s used %jd seconds\n", pool->name, used);
                }
        }

        if (pool) {
                DWARN("exit@ pool %s\n", pool->name);
        }

        return NULL;
}

static int __pool_create(recovery_pool_t **pool, const char *name) {
        int ret;
        recovery_pool_t *_pool;

        *pool = NULL;

        ret = ymalloc((void **)&_pool, sizeof(recovery_pool_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(_pool, 0x0, sizeof(recovery_pool_t));

        strcpy(_pool->name, name);
        _pool->lastscan = gettime();

        _pool->init_flag = 0;
        _pool->status = __RECOVERY_WAITING__;
        _pool->need_rescan = FALSE;

        token_bucket_init(&_pool->token_bucket, "recovery", 0, 0, 0, 0, 0);

        ret = sem_init(&_pool->sem, 0, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_free, ret);
        }

        ret = recovery_immediately(_pool);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = scan_mq_init(&_pool->smq, _pool->name);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = recovery_mq_init(&_pool->rmq);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = recovery_tp_init(&_pool->stp, "stp");
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = recovery_tp_init(&_pool->rtp, "rtp");
        if (unlikely(ret))
                GOTO(err_free, ret);

        {
                // TODO move disk to node
                uint64_t node_count = get_recovery_total(_pool->name, FALSE);
                uint64_t disk_count = get_recovery_total(_pool->name, TRUE);

                _pool->rtp.success_total = node_count + disk_count;

                set_recovery_total(_pool->name, _pool->rtp.success_total, FALSE);
                set_recovery_total(_pool->name, 0, TRUE);

                DINFO("pool %s node_count %ju disk_count %ju\n",
                      _pool->name, node_count, disk_count);
        }

        recovery_disk_info_clean(_pool, __RECOVERY_WAITING__);

        *pool = _pool;

        return 0;
err_free:
        yfree((void **)&_pool);
err_ret:
        return ret;
}

static int __recovery_pool_destroy(recovery_pool_t *pool)
{
        recovery_pool_t *_pool;

        hash_table_remove(recovery_tab, pool->name, (void **)&_pool);
        YASSERT(pool == _pool);

        DINFO("remove pool %s\n", pool->name);

        recovery_delete_all_pool_file(pool);

        sem_destroy(&pool->sem);

        pool->smq.destroy(&pool->smq);

        sy_rwlock_destroy(&pool->stp.rwlock);
        sy_rwlock_destroy(&pool->rtp.rwlock);

        sy_rwlock_destroy(&pool->smq.rwlock);
        sy_rwlock_destroy(&pool->rmq.rwlock);

        yfree((void **)&_pool);

        return 0;
}

static int __pool_add(const char *name) {
        int ret;
        recovery_pool_t *pool;

        ret = __pool_create(&pool, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = hash_table_insert(recovery_tab, (void *)pool, pool->name, 0);
        if (unlikely(ret))
                GOTO(err_pool, ret);

        ret = sy_thread_create(__recovery_pool_worker, pool);
        if (unlikely(ret))
                GOTO(err_hash, ret);

        DINFO("pool %s added\n", name);
        return 0;

err_hash:
        hash_table_remove(recovery_tab, name, NULL);
err_pool:
        yfree((void **)&pool);
err_ret:
        return ret;
}

static int __pool_iter(void *arg1, void *arg2) {
        struct list_head *pool_list = arg1;
        recovery_pool_t *pool = arg2;

        if (pool->status != __RECOVERY_WAITING__) {
                __module__.running_pool_count += 1;
        }

        int found = system_pool_list_exists(pool_list, pool->name);
        if (!found) {
                // TODO is deleted pool
                DWARN("pool %s deleted\n", pool->name);

                pool->status_remove = __RECOVERY_REMOVEING__;

                recovery_wakeup_one_pool(pool->name, "deleted");

                // TODO STOP diskmd_recovery.c, if any

        }

        return 0;
}

static int __pool_check(const char *parent, const char *name, void *opaque)
{
        int ret;
        struct list_head *pool_list = opaque;

        if (!system_pool_list_exists(pool_list, name)) {
                char path[MAX_NAME_LEN];
                sprintf(path, "%s/%s", parent, name);

                if (_is_dir(path)) {
                        DWARN("gc dir %s/%s\n", parent, name);
                        recovery_immediately_unregister(name);

                        ret = _delete_path(path);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

/**
 * 如果在节点离线的情况下，删除了存储池，以下目录的存储池需要回收:
 * - /dev/shm/lich/nodectl/diskstat
 * - /dev/shm/lich/nodectl/recovery
 * - /dev/shm/lich/nodectl/balance
 */
static int __nodectl_gc(struct list_head *pool_list)
{
        int ret;

        ret = _dir_iterator("/dev/shm/lich4/nodectl/diskstat", __pool_check, pool_list);
        if (unlikely(ret)) {
                DBUG("ret %d\n", ret);
        }

        ret = _dir_iterator("/dev/shm/lich4/nodectl/recovery", __pool_check, pool_list);
        if (unlikely(ret)) {
                DBUG("ret %d\n", ret);
        }

        ret = _dir_iterator("/dev/shm/lich4/nodectl/balance", __pool_check, pool_list);
        if (unlikely(ret)) {
                DBUG("ret %d\n", ret);
        }

        ret = _dir_iterator("/opt/fusionstack/data/recovery", __pool_check, pool_list);
        if (unlikely(ret)) {
                DBUG("ret %d\n", ret);
        }

        return 0;
}

STATIC int __recovery_worker__(void *_ent) {
        int ret;
        struct list_head pool_list;
        recovery_pool_t *pool;

        (void)_ent;

        // 扫描所有pool，加入hash table
        // 对每一个new pool，启动一工作线程
        // TODO 如果某pool被删除，对应的内存对象移除

        // set A: pool_list
        // set B: recovery_tab
        // A - B: new pools
        // B - A: deleted pools

        ret = system_pool_list(&pool_list);
        if (unlikely(ret)) {
                DWARN("ret %d\n", ret);
                GOTO(err_ret, ret);
        }

        ret = __nodectl_gc(&pool_list);
        if (unlikely(ret)) {
                DERROR("ret %d\n", ret);
                GOTO(err_free, ret);
        }

        struct list_head *pos, *n;
        pool_name_t *pool_name;
        list_for_each_safe(pos, n, &pool_list) {
                pool_name = list_entry(pos, pool_name_t, hook);

                diskmd_recovery_check(pool_name->pool);

                pool = hash_table_find(recovery_tab, (void *)pool_name->pool);
                if (pool == NULL) {
                        ret = __pool_add(pool_name->pool);
                        if (unlikely(ret))
                                GOTO(err_free, ret);
                }
        }

        ret = sy_rwlock_wrlock(&__module__.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        __module__.running_pool_count = 0;
        hash_iterate_table_entries(recovery_tab, __pool_iter, &pool_list);

        sy_rwlock_unlock(&__module__.lock);

        list_free(&pool_list, yfree);

        return 0;
err_free:
        list_free(&pool_list, yfree);
err_ret:
        return ret;
}

static void *__recovery_worker(void *_ent)
{
        (void) _ent;
        // 需要等待所有节点启动
        sleep(60);
        DINFO("enter recovery thread@ pool number %d\n", recovery_tab->num_of_entries);

        while (1) {

                __recovery_worker__(_ent);

                is_recovery_started = 1;

                // TODO 过于频繁
                sleep(60);
        }

        DERROR("exit recovery thread@ pool number %d\n", recovery_tab->num_of_entries);
        return NULL;
}

int recovery_is_running()
{
        int ret, is_running = 0;

        ret = sy_rwlock_rdlock(&__module__.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (__module__.running_pool_count > 0)
                is_running = 1;

        sy_rwlock_unlock(&__module__.lock);

        if (!is_running) {
                is_running = diskmd_recovery_is_running();
        }

        return is_running;
}

/**
 * @checklist
 * - 恢复周期不可重入，每一周期包括scan和recovery两阶段
 * - 恢复对象： 所有dir和volume的所有chunk
 * - 控制参数： 节点级（目标带宽）
 *
 * @return
 */
int recovery_init()
{
        int ret;

        is_recovery_started = 0;

#ifdef RECOVERY_GLOBAL_FLAG
        recovery_global_flag_init();
#endif

        ret = recovery_config_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        recovery_tab = hash_create_table(__recovery_cmp, __recovery_key, "recovery_table");
        if (recovery_tab == NULL) {
                ret = ENOMEM;
                DERROR("ret (%d) %s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_init(&__module__.lock, "r_mod_lock");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        __module__.running_pool_count = 0;

        ret = sy_thread_create2(__recovery_worker, NULL, "recovery_init");
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = recovery_out_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}
